
//import main.java.LoginEvent;
//import main.java.LoginWarning;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Map;



//------------------------------------------------------------------------------------------------------------


public class FlinkLoginFail
{

    //------------------------------------------------------------------------------------------------------------

    public static class LoginEvent implements Serializable
    {
        private String userId;//用户ID
        private String ip;//登录IP
        private String type;//登录类型



        public LoginEvent()
        {
        }


        private LoginEvent(String userId, String ip, String type)
        {
            this.userId = userId;
            this.ip = ip;
            this.type = type;
        }
        // gets sets
    }

    //------------------------------------------------------------------------------------------------------------
    public  static class LoginWarning implements Serializable
    {
        private String userId;
        private String type;
        private String ip;

        public LoginWarning()
        {
        }

        private LoginWarning(String userId, String type, String ip)
        {
            this.userId = userId;
            this.type = type;
            this.ip = ip;
        }
    }







//    ----------------------------------------------------------------------------------------------

    public static void main(String[] args) throws Exception {


        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


//    数据输入
        DataStream<LoginEvent> loginEventStream = env.fromCollection(Arrays.asList(
                new LoginEvent("1", "192.168.0.1", "fail"),
                new LoginEvent("1", "192.168.0.2", "fail"),
                new LoginEvent("1", "192.168.0.3", "fail"),
                new LoginEvent("2", "192.168.10,10", "success")
        ));


//    设定规则
        Pattern<LoginEvent, LoginEvent> loginFailPattern = Pattern.<LoginEvent>begin("begin")
                .where(new IterativeCondition<LoginEvent>() {
                    @Override
                    public boolean filter(LoginEvent loginEvent, Context context) throws Exception {
                        return loginEvent.type.equals("fail");
                    }
                })
                .next("next")
                .where(new IterativeCondition<LoginEvent>() {
                    @Override
                    public boolean filter(LoginEvent loginEvent, Context context) throws Exception {
                        return loginEvent.type.equals("fail");
                    }
                })
                .within(Time.seconds(1));


        PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventStream.keyBy(value -> value.userId), loginFailPattern);


        DataStream<LoginWarning> loginFailDataStream = patternStream.select(new PatternSelectFunction<LoginEvent, LoginWarning>() {
            @Override
            public LoginWarning select(Map<String, List<LoginEvent>> map) throws Exception {
                List<LoginEvent> first = map.get("begin");
                List<LoginEvent> second = map.get("next");
                return new LoginWarning(second.get(0).userId,second.get(0).ip, second.get(0).type);
            }
        });


        loginFailDataStream.map(eventobject->"userId:"+eventobject.userId+"\ntype:"+eventobject.type+"\nip:"+eventobject.ip).print();

        env.execute();


    }



}


//代碼來自:
//https://www.jianshu.com/p/a19e0d069bca
